/**
 * Copyright [2020] [LiBo/Alex of copyright liboware@gmail.com ]
 * <p>
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.hyts.stream.engine.stream;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Properties;

/**
 * @project-name:lhy-stream
 * @package-name:com.hyts.stream.engine.stream
 * @author:LiBo/Alex
 * @create-date:2022-05-14 16:30
 * @copyright:libo-alex4java
 * @email:liboware@gmail.com
 * @description:
 */
public class StatisticDataStreamExecutor {

//    public static void main(String[] args) throws Exception {
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//时间为事件时间
//        env.enableCheckpointing(500);//设置检查点
//        Properties properties = Property.getKafkaProperties(Constants.GET_USER_AUTH_NAME);
//        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<String>(Constants.TOPIC_USER_AUTH, new SimpleStringSchema(), properties);
//        consumer.setStartFromGroupOffsets();
//        DataStream<String> edits = env.addSource(consumer).name("user_auth_edit");
//        DataStream<UserAuth> calStream = edits
//                .filter(new FilterFunction<String>() {  ///把时间异常过滤，防止因时间异常阻塞
//                    @Override
//                    public boolean filter(String s) throws Exception {
//                        try {
//                            JSONObject jsonObject = JSONObject.parseObject(s);
//                            String ts = jsonObject.getString("ts");
//                            Pattern resultPattern = Pattern.compile("\\d{10}");
//                            Matcher resultMatcher = resultPattern.matcher(ts);
//                            //过滤时间大于现在的,避免异常的未来数据影响到时间窗口。容忍一个小时的时间差
//                            if (!resultMatcher.matches()) {
//                                return false;
//                            }
//                            int now = (int) (System.currentTimeMillis() / 1000) + 30 * 60;//避免服务器时间不准确，设立时间差
//                            int time = Integer.parseInt(ts);
//                            return now >= time;
//                        } catch (Exception e) {
//                            System.out.println("Filter failed");
//                            System.err.println(e + " " + e.getMessage());
//                            return false;
//                        }
//                    }
//                })
//                .assignTimestampsAndWatermarks(new TaskTimestamp(Time.hours(0)))
//                .windowAll(TumblingEventTimeWindows.of(Time.hours(1)))
//                .allowedLateness(Time.hours(2))
//                .aggregate(new UserAuthCount());
//        calStream.addSink(new SinkToMySQL());
//        env.execute("executed user auth");
//        System.out.println("executed");
//    }

}
